《Netty实战

您所在的位置:网站首页 netty json 粘包 《Netty实战

《Netty实战

2023-09-07 15:19| 来源: 网络整理| 查看: 265

一、背景

拆包和粘包是在socket编程中经常出现的情况,在socket通讯过程中,如果通讯的一端一次性连续发送多条数据包,tcp协议会将多个数据包打包成一个tcp报文发送出去,这就是所谓的粘包。而如果通讯的一端发送的数据包超过一次tcp报文所能传输的最大值时,就会将一个数据包拆成多个最大tcp长度的tcp报文分开传输,这就叫做拆包。

 服务端接收到数据后,根据约定的传输协议(如MQTT),对数据做拆包粘包;

二、MQTT 2.1、简述

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。

2.2、实现方式

实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。

MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:

Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);payload,可以理解为消息的内容,是指订阅者具体要使用的内容。

2.3、数据包结构

在MQTT协议中,一个MQTT数据包由:固定头(Fixed header)、可变头(Variable header)、消息体(payload)三部分构成。MQTT数据包结构如下:

固定头(Fixed header)。存在于所有MQTT数据包中,表示数据包类型及数据包的分组类标识。可变头(Variable header)。存在于部分MQTT数据包中,数据包类型决定了可变头是否存在及其具体内容。消息体(Payload)。存在于部分MQTT数据包中,表示客户端收到的具体内容。

三、自定义协议

MQTT协议中固定头部分占前两Byte,第一个Byte的0~3bits为标识位,4~7bits为类型,第二个Byte表示剩余数据的长度,当最后一位为1时,表示长度不足,需要使用二个字节继续保存。

基于MQTT协议,结合实际业务情况,0~3bits所能表示的范围,并不能区分所有数据类型,最终定义格式如下:

固定头(Fixed header)占前两个Byte,用于标识数据类型;可变头(Variable header)根据数据类型不同,数据内容(数据长度)不同,但其末尾的四个Byte用于标识剩余数据长度;消息体(Payload)个别消息类型,包含消息体

四、基于netty实现拆包粘包

netty封装了多种拆包粘包的实现,但实际业务里数据不固定(如长度标识位、数据偏移位,不同类型不一致),无法固化一种格式去解析数据,最终自己实现封装解析流程

记录缓冲当前指针位置,并从字节缓冲数组中取前两位字节,判断数据类型

in.markReaderIndex(); log.info("开始 in.size={}", in.readableBytes()); return dataHandle(in, 2, (body) -> { switch (body){ case 略 default: in.resetReaderIndex(); return null; } });

如果当前数据不足,则回滚指针

private ByteBuf dataHandle(ByteBuf in, int cutSize, Function function){ if(in.readableBytes() < cutSize){ in.resetReaderIndex(); return null; } byte[] sizeBytes = new byte[cutSize]; in.readBytes(sizeBytes, 0, cutSize); String body = HexUtil.encodeHexStr(sizeBytes).toUpperCase(); return function.apply(body); }

如果为消息类型,判断当前缓冲是否有足够的数据

public ByteBuf apTrajectory(String headerStr, ByteBuf in) { try { return dataHandle(in, 11, (size) -> { int len = Integer.parseInt(size.substring(18, 22), 16); return dataHandle(in, len, (body) -> Unpooled.wrappedBuffer(HexUtil.decodeHex(headerStr + size + body))); }); } catch (Exception e) { log.error("处理AP数据异常 error", e); } return null; }

实现ByteToMessageDecoder类的decode方法,如果未解析到数据,则继续等待

@Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception { ByteBuf bb = doDecode(byteBuf); if(null != bb){ list.add(bb); } }



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3